package com.alinesno.cloud.compoment.kafka.config;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties ;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;


/**
 * kafka消费者配置
 * @author LuoAnDong
 * @since 2019年4月9日 上午11:20:24
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

	private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class) ; 

	@Autowired
	private KafkaProperties KafkaProperties ; 
	
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String,Object> props = KafkaProperties.buildConsumerProperties() ;
        
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
    	log.debug("初始化客户端kafka配置:{}" , KafkaProperties);
        
        return props ; 

    }

    /**
     * 手动提交
     * @return
     */
	@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(manualConsumerFactory());
        ContainerProperties props = factory.getContainerProperties();
        
		props.setAckMode(AckMode.MANUAL_IMMEDIATE) ; 
		props.setIdleEventInterval(100L);
		props.setPollTimeout(1000L);
        
        factory.setAckDiscarded(true);
        
        factory.setRecoveryCallback(new RecoveryCallback<Void>() {
            @Override
            public Void recover(RetryContext context) throws Exception {
                return null;
            }
        });

        return factory;
    }
	
    @Bean
    public ConsumerFactory<String, String> manualConsumerFactory() {
        Map<String, Object> configs = consumerConfigs();
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(configs);
    }

}